package p2p

import (
	"bufio"
	"context"
	"encoding/json"
	"fmt"
	"github.com/libp2p/go-libp2p/core/network"
	peer "github.com/libp2p/go-libp2p/core/peer"
	"github.com/libp2p/go-libp2p/core/protocol"
	"osiris/dto"
	"osiris/logger"
	"osiris/ocrypto/ed25519"
	"sync"
)

// PingProtocol 握手协议：交换公钥、multiaddr
type PingProtocol struct{}

func (protocol PingProtocol) protocolName() protocol.ID {
	return "/p2p/ping"
}

func (protocol PingProtocol) openStream(peerID peer.ID, jsonStr string, wg *sync.WaitGroup) {
	s, err := ha.NewStream(context.Background(), peerID, protocol.protocolName())
	if err != nil {
		if wg != nil {
			wg.Done()
		}
		logger.Error(map[string]interface{}{fmt.Sprintf("[p2p] [Open stream %s] Ha.NewStream", protocol.protocolName()): err})
		return
	} else {
		logger.Printf("Sucessfully open stream %s with remote peer %s\n", protocol.protocolName(), peerID.String())
		logger.Info(map[string]interface{}{fmt.Sprintf("[p2p] [Open stream %s] Ha.NewStream", protocol.protocolName()): "success"})
	}

	// Create a buffered stream so that read and writes are non blocking.
	rw := bufio.NewReadWriter(bufio.NewReader(s), bufio.NewWriter(s))

	// 不可能同时读和写，就开一个协程
	go func() {
		errorOccuredBeforeEcho := true
		defer func() {
			if wg != nil {
				wg.Done()
			}
			if errorOccuredBeforeEcho {
				s.Close()
			}
		}()

		rw.WriteString(fmt.Sprintf("%s\n", jsonStr))
		rw.Flush()
		errorOccuredBeforeEcho = false

		//等待ECHO,或网络流被对方节点关闭
		str, err2 := rw.ReadString('\n')
		if err2 != nil {
			//TODO confirm: what exactly will happen if the remote side close the stream?
			//if err == io.EOF {}
			logger.Printf("Stream %s is closed by the remote side\n", protocol.protocolName())
			logger.Info(map[string]interface{}{fmt.Sprintf("[p2p] [Open Stream %s]", protocol.protocolName()): "Stream closed by the remote side"})
			return
		}

		//读完ECHO关流，处理Dto
		s.Close()
		logger.Printf("Stream %s is closed by the local side\n", protocol.protocolName())
		logger.Info(map[string]interface{}{fmt.Sprintf("[p2p] [Open Stream %s]", protocol.protocolName()): "Stream closed by the local side"})

		var peerDto dto.PeerDto
		if err3 := json.Unmarshal([]byte(str), &peerDto); err3 != nil {
			logger.Error(map[string]interface{}{fmt.Sprintf("[p2p] [Open Stream %s] PeerDto unmarshal", protocol.protocolName()): err3})
			return
		}

		//验证DtoHash、节点签名
		if !peerDto.Verify() {
			logger.Warn(map[string]interface{}{fmt.Sprintf("[p2p] [Open stream %s] verify peer", protocol.protocolName()): "fail"})
			return
		}

		StoreSenderPeer(peerDto)
	}()
}

// handlePing 处理协议为PingProtocol的网络流
//
//	@param s network.Stream
func (protocol PingProtocol) handleStream(s network.Stream) {
	logger.Println("Got a new stream " + protocol.protocolName())
	logger.Info(map[string]interface{}{fmt.Sprintf("[p2p] [Handle Stream %s]", protocol.protocolName()): "Got a new stream "})

	// Create a buffer stream for non blocking read and write.
	rw := bufio.NewReadWriter(bufio.NewReader(s), bufio.NewWriter(s))
	wg := new(sync.WaitGroup)
	wg.Add(2)
	verifyCh := make(chan bool)

	//读协程
	go func() {
		//发生错误时return将主动关闭流
		errorOccured := true
		defer func() {
			wg.Done()
			if !errorOccured {
				return
			}

			s.Close()
			logger.Printf("Close stream %s by the local side", protocol.protocolName())
			logger.Info(map[string]interface{}{fmt.Sprintf("[p2p] [Handle Stream %s]", protocol.protocolName()): "close stream by the local side"})
		}()

		str, err := rw.ReadString('\n')
		if err != nil {
			logger.Error(map[string]interface{}{fmt.Sprintf("[p2p] [Handle Stream %s] read string", protocol.protocolName()): err})
			return
		}

		var peerDto dto.PeerDto
		if err1 := json.Unmarshal([]byte(str), &peerDto); err1 != nil {
			logger.Error(map[string]interface{}{fmt.Sprintf("[p2p] [Handle stream %s] PeerDto unmarshal", protocol.protocolName()): err1})
			return
		}

		//验证DtoHash、节点签名
		if !peerDto.Verify() {
			verifyCh <- false
			logger.Warn(map[string]interface{}{fmt.Sprintf("[p2p] [Handle stream %s] verify peer", protocol.protocolName()): "fail"})
			return
		}

		//验证通过的话通知写协程返回自己的PeerDto，同时存储对方的节点信息
		verifyCh <- true
		errorOccured = false
		StoreSenderPeer(peerDto)
	}()

	//写协程
	go func() {
		defer wg.Done()

		valid := <-verifyCh
		if !valid {
			return
		}

		peerDto := dto.PeerDto{
			BaseDto: dto.BaseDto{
				Code: dto.CODE_PONG,
			},
			SenderPeerID:     ha.ID().String(),
			SenderMultiaddrs: GetSelfMultiaddrStrs(),
			SenderPubKey:     ed25519.GetPeerPubKeyStr(),
		}
		err1 := peerDto.HashAndSign()
		if err1 != nil {
			logger.Error(map[string]interface{}{fmt.Sprintf("[p2p] [Handle stream %s] echo: sign peer", protocol.protocolName()): err1})
			return
		}

		peerDtoJsonBytes, err2 := json.Marshal(peerDto)
		if err2 != nil {
			logger.Error(map[string]interface{}{fmt.Sprintf("[p2p] [Handle stream %s] PeerDto marshal", protocol.protocolName()): err2.Error()})
			return
		}

		rw.WriteString(fmt.Sprintf("%s\n", string(peerDtoJsonBytes)))
		rw.Flush()

		//等待网络流被对方节点关闭
		_, err := rw.ReadString('\n')
		if err != nil {
			//TODO confirm: what exactly will happen if the remote side close the stream?
			//if err == io.EOF {}
			logger.Printf("Stream %s is closed by the remote side\n", protocol.protocolName())
			logger.Info(map[string]interface{}{fmt.Sprintf("[p2p] [Handle Stream %s]", protocol.protocolName()): "Stream closed by the remote side"})
			return
		}
	}()

	wg.Wait()
}
